Lab 11: MapReduce

Introduction

In this lab, we're going to look at MapReduce with Apache Spark. At the end of the lab, you should be able to:

  • Create a local SparkContext object.
  • Use Spark to count words in a document with MapReduce.

Getting started

Let's start by importing the packages we'll need. This week we're going to use pyspark, a Python package that wraps Apache Spark and makes its functionality available in Python. We'll also use Python's built in urllib2 module to load data via HTTP and English language stop words from scikit-learn.


In [ ]:
import pyspark
import urllib2

from sklearn.feature_extraction import text

First, let's initialise a SparkContext object, which will represent our connection to the Spark cluster. To do this, we must first specify the URL of the master node to connect to. As we're only running this notebook for demonstration purposes, we can just run the cluster locally, as follows:


In [ ]:
sc = pyspark.SparkContext(master='local[*]')

Note: By specifying master='local[*]', we are instructing Spark to run with as many worker threads as there are logical cores available on the host machine. Alternatively, we could directly specify the number of threads, e.g. master='local[4]' to run four threads. However, we need to make sure to specify at least two threads, so that there is one available for resource management and at least one available for data processing.

Next, let's load the data. This week, we'll load data from a URL resource. We can do this using Python's built in urllib2 module. First, let's specify the URL of the resource:


In [ ]:
url = 'https://www.gutenberg.org/files/100/100-0.txt'  # The Complete Works of Shakespeare

We can now load the data at the given URL using urllib2 (this might take a little while to load, depending on the speed of your internet connection), as follows:


In [ ]:
data = urllib2.urlopen(url).read()
data = data.splitlines()  # Split the text into a list of lines

data[:10]  # Print the first ten lines

We can load variables from our local Python kernel into our Spark cluster using the parallelize method of the SparkContext we have created, like this:


In [ ]:
rdd = sc.parallelize(data)  # Create a resilient distributed dataset (RDD) from the data

We can examine the first few entries of the RDD using its take method:


In [ ]:
rdd.take(10)  # Take the first ten entries

As can be seen, the data inside the Spark RDD is equivalent to our earlier text data.

Counting words

Let's use Spark to carry out a simple word count on our text data using MapReduce. As our RDD consists of a list of lines, we should first split the lines into words. We can use the flatMap method of the RDD to do this. The flatMap method maps a given function to each element in the RDD (e.g. the lines of the text) and then flattens the result. In our case, we want to split each line of our RDD into a list of words (i.e. map some word splitting function) and then create a single list of all of the words in the RDD (i.e. flatten the result).

Once we've got an RDD containing a list of all the words in the text, it might be useful to convert each word to lowercase, so that the word 'Thou' is treated the same as the word 'thou'. We can do this using the map method, which applies a given function to each element in an RDD.

Then, for each word in the RDD, we can create a key-value pair, whose key consists of the word and whose value consists of just the number one, e.g. ('thou', 1). As this is an operation that must be applied to each element in the RDD, we can again use the map method.

Finally, we can apply a function to reduce the values for each key using the reduceByKey method of the RDD. This method iteratively applies a given function to pairs of values that have been generated for a given key until just one value remains. In our case, we want to sum the counts of each individual word, so we can supply the function lambda x, y: x + y.


In [ ]:
word_counts = (                             # Using parentheses allows inline comments like this
    rdd.flatMap(lambda line: line.split())  # Split each line into words, flatten the result
       .map(lambda word: word.lower())      # Make each word lowercase
       .map(lambda word: (word, 1))         # Map each word to a key-value pair
       .reduceByKey(lambda x, y: x + y)     # Reduce pairs of values until just one remains for each key
)

The result of the set of operations above is another RDD:


In [ ]:
word_counts

This is because Spark computations are lazy, i.e. operations are appended to a computation graph, but not carried out until a later point in time. The advantage of this is that we can continue to append operations to an RDD until we are ready to compute the final result. At this point, Spark is able to optimise the calculation, which can reduce the total amount of computation that must be done due to the way some of the steps have been ordered.

If we need to, we can check the results of the operation using the take method:


In [ ]:
word_counts.take(5)

As can be seen, Spark has generated counts for each of the words in the text. Next, let's sort the word counts according to the most commonly used words. To do this, we can use the sortBy method of the RDD. Like reduceByKey, sortBy applies a given function to each key-value pair (kvp) in our RDD.

In our case, we want to sort the RDD values according to the value of each key-value pair (i.e. the word count). As a result, we can pass the inline function lambda kvp: kvp[1] to instruct Spark to sort according to the value of each key-value pair (lambda kvp: kvp[0] would sort by the key). We'll also pass the optional keyword argument ascending=False to force Spark to sort the values in ascending order (most common words first) rather than descending order (the default).


In [ ]:
top = word_counts.sortBy(lambda kvp: kvp[1], ascending=False)

top.take(5)

This is good, but the most common words correspond to very commonly used words in English (no surprise!). Let's filter these out using the list of English language stop words included in scikit-learn (recall Lab 05). We can do this using the filter method of the RDD, which filters each entry in the RDD according to whether a given function returns True or False.

In our case, we want to filter words that are not in the stop words list, so we can pass the inline function lambda kvp: kvp[0] not in text.ENGLISH_STOP_WORDS which returns True when kvp[0] (the key / word in the key-value pair) is not in the stop words list and False otherwise.


In [ ]:
top = top.filter(lambda kvp: kvp[0] not in text.ENGLISH_STOP_WORDS)

top.take(10)

If we were processing a very large text document, it would make more sense to perform the filtering at the start the of process, rather than at the end, in order to avoid all the computation involved in mapping and reducing solutions that we already know are on the stop words list. In such a case, we could simply write the entire algoritm as:


In [ ]:
word_counts = rdd.flatMap(lambda line: line.split()) \
                 .map(lambda word: word.lower()) \
                 .filter(lambda word: word not in text.ENGLISH_STOP_WORDS) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda x, y: x + y) \
                 .sortBy(lambda kvp: kvp[1], ascending=False)

word_counts.take(10)